自拓展输入流

您所在的位置:网站首页 flink groupid 自拓展输入流

自拓展输入流

2022-05-12 10:56| 来源: 网络整理| 查看: 265

用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。

语法格式1 2 3 4 5 6 7CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME 关键字 表1 关键字说明

参数

是否必选

说明

type

数据源类型,"user_defined"表示数据源为用户自定义数据源。

type_class_name

用户实现获取源数据的source类名称,注意包含完整包路径。

type_class_parameter

用户自定义source类的入参,仅支持一个string类型的参数。

注意事项

用户自定义source类需要继承类RichParallelSourceFunction,并指定数据类型为Row例如定义类MySource:public class MySource extends RichParallelSourceFunction{},重点实现其中的open、run和close函数。

依赖pom:

org.apache.flink flink-streaming-java_2.11 ${flink.version} provided org.apache.flink flink-core ${flink.version} provided 示例

实现每周期产生一条数据(仅包含一个字段,类型为INT,初始值为1,每周期加1),周期时长为60s,通过入参指定。

1 2 3 4 5 6 7 8 9CREATE SOURCE STREAM user_in_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySource", type_class_parameter = "60" ) TIMESTAMP BY car_timestamp.rowtime;

自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3